import 'dart:async';

import 'package:async/async.dart';

import '../common/test_page.dart';

Stream streamError(Stream base, int errorValue, Object error) {
  return base.map((x) => (x == errorValue) ? throw error : x);
}

Stream<T> mks<T>(Iterable<T> iterable) {
  var iterator = iterable.iterator;
  var controller = StreamController<T>();
  var ms = ((++ctr) * 5) % 7 + 3;
  Timer.periodic(Duration(milliseconds: ms), (Timer timer) {
    if (iterator.moveNext()) {
      controller.add(iterator.current);
    } else {
      controller.close();
      timer.cancel();
    }
  });
  return controller.stream;
}

int ctr = 0;

class StreamZipTestPage extends TestPage {
  StreamZipTestPage(super.title) {
    void testZip(Iterable<Stream> streams, Iterable expectedData) {
      var data = [];
      Stream zip = StreamZip(streams);
      zip.listen(data.add, onDone: () {
        expect(data);
      });
    }

    test('基本的StreamZip()', () {
      testZip([
        mks([1, 2, 3]),
        mks([4, 5, 6]),
        mks([7, 8, 9])
      ], [
        [1, 4, 7],
        [2, 5, 8],
        [3, 6, 9]
      ]);
    });

    test('StreamZip()长度不均匀1', () {
      testZip([
        mks([1, 2, 3, 99, 100]),
        mks([4, 5, 6]),
        mks([7, 8, 9])
      ], [
        [1, 4, 7],
        [2, 5, 8],
        [3, 6, 9]
      ]);
    });

    test('StreamZip()长度不均匀2', () {
      testZip([
        mks([1, 2, 3]),
        mks([4, 5, 6, 99, 100]),
        mks([7, 8, 9])
      ], [
        [1, 4, 7],
        [2, 5, 8],
        [3, 6, 9]
      ]);
    });

    test('StreamZip()长度不均匀3', () {
      testZip([
        mks([1, 2, 3]),
        mks([4, 5, 6]),
        mks([7, 8, 9, 99, 100])
      ], [
        [1, 4, 7],
        [2, 5, 8],
        [3, 6, 9]
      ]);
    });

    test('StreamZip()长度不均匀4', () {
      testZip([
        mks([1, 2, 3, 98]),
        mks([4, 5, 6]),
        mks([7, 8, 9, 99, 100])
      ], [
        [1, 4, 7],
        [2, 5, 8],
        [3, 6, 9]
      ]);
    });

    test('StreamZip()空1', () {
      testZip([
        mks([]),
        mks([4, 5, 6]),
        mks([7, 8, 9])
      ], []);
    });

    test('StreamZip()空2', () {
      testZip([
        mks([1, 2, 3]),
        mks([]),
        mks([7, 8, 9])
      ], []);
    });

    test('StreamZip()空3', () {
      testZip([
        mks([1, 2, 3]),
        mks([4, 5, 6]),
        mks([])
      ], []);
    });

    test('StreamZip()空源', () {
      testZip([], []);
    });

    test('StreamZip()单一来源', () {
      testZip([
        mks([1, 2, 3])
      ], [
        [1],
        [2],
        [3]
      ]);
    });

    test('StreamZip()其他流', () {
      var st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4);
      Stream st2 = Stream.periodic(const Duration(milliseconds: 5), (x) => x + 4).take(3);
      var c = StreamController.broadcast();
      var st3 = c.stream;
      testZip([
        st1,
        st2,
        st3
      ], [
        [1, 4, 7],
        [2, 5, 8],
        [3, 6, 9]
      ]);
      c
        ..add(7)
        ..add(8)
        ..add(9)
        ..close();
    });

    test('StreamZip()错误1', () {
      expect(
          StreamZip([
            streamError(mks([1, 2, 3]), 2, 'BAD-1'),
            mks([4, 5, 6]),
            mks([7, 8, 9])
          ]).toList());
    });

    test('StreamZip()错误2', () {
      expect(
          StreamZip([
            mks([1, 2, 3]),
            streamError(mks([4, 5, 6]), 5, 'BAD-2'),
            mks([7, 8, 9])
          ]).toList());
    });

    test('StreamZip()错误3', () {
      expect(
          StreamZip([
            mks([1, 2, 3]),
            mks([4, 5, 6]),
            streamError(mks([7, 8, 9]), 8, 'BAD-3')
          ]).toList());
    });

    test('StreamZip()结束时出错', () {
      expect(
          StreamZip([
            mks([1, 2, 3]),
            streamError(mks([4, 5, 6]), 6, 'BAD-4'),
            mks([7, 8, 9])
          ]).toList());
    });

    test('StreamZip()第一个结束前出现错误', () {
      expect(
          StreamZip([
            streamError(mks([1, 2, 3, 4]), 4, 'BAD-5'),
            (StreamController()
              ..add(4)
              ..add(5)
              ..add(6))
                .stream,
            (StreamController()
              ..add(7)
              ..add(8)
              ..add(9))
                .stream
          ]).toList());
    });

    test('StreamZip()第一端后出现错误', () {
      var controller = StreamController();
      controller
        ..add(7)
        ..add(8)
        ..add(9);
      var trans =
      StreamTransformer<int, int>.fromHandlers(handleDone: (EventSink s) {
        Timer.run(() {
          controller.addError('BAD-6');
        });
        s.close();
      });
      testZip([
        mks([1, 2, 3]).transform(trans),
        mks([4, 5, 6]).transform(trans),
        controller.stream
      ], [
        [1, 4, 7],
        [2, 5, 8],
        [3, 6, 9]
      ]);
    });

    test('Pause/Resume', () {
      var sc1p = 0;
      var c1 = StreamController(onPause: () {
        sc1p++;
      }, onResume: () {
        sc1p--;
      });

      var sc2p = 0;
      var c2 = StreamController(onPause: () {
        sc2p++;
      }, onResume: () {
        sc2p--;
      });

      done() {
        expect(sc1p);
        expect(sc2p);
      }

      Stream zip = StreamZip([c1.stream, c2.stream]);

      const ms25 = Duration(milliseconds: 25);

      var it = StreamIterator(zip);

      it.moveNext().then((hasMore) {
        expect(hasMore);
        expect(it.current);
        return it.moveNext();
      }).then((hasMore) {
        expect(hasMore);
        expect(it.current);
        c2.add(6);
        return it.moveNext();
      }).then((hasMore) {
        expect(hasMore);
        expect(it.current);
        Future.delayed(ms25).then((_) {
          c2.add(8);
        });
        return it.moveNext();
      }).then((hasMore) {
        expect(hasMore);
        expect(it.current);
        c2.add(9);
        return it.moveNext();
      }).then((hasMore) {
        expect(hasMore);
        done();
      });

      c1
        ..add(1)
        ..add(3)
        ..add(5)
        ..add(7)
        ..close();
      c2
        ..add(2)
        ..add(4);
    });

    test('pause-resume2', () {
      var s1 = Stream.fromIterable([0, 2, 4, 6, 8]);
      var s2 = Stream.fromIterable([1, 3, 5, 7]);
      var sz = StreamZip([s1, s2]);
      var ctr = 0;
      late StreamSubscription sub;
      sub = sz.listen((v) {
        expect(v);
        if (ctr == 1) {
          sub.pause(Future.delayed(const Duration(milliseconds: 25)));
        } else if (ctr == 2) {
          sub.pause();
          Future.delayed(const Duration(milliseconds: 25)).then((_) {
            sub.resume();
          });
        }
        ctr++;
      });
    });
  }

}
